home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- '''message.py
-
- Classes:
- SEHeaderMessage - A SBHeaderMessage extended for SpamExperts
- '''
- import re
- import sys
- import email
- import types
- from spambayes.message import SBHeaderMessage
- from spambayes.tokenizer import try_to_repair_damaged_base64
- from spamexperts.Options import options
-
- class SEHeaderMessage(SBHeaderMessage):
- '''Message class that is cognizant of SpamExperts headers and the
- Spambayes headers.'''
-
- def __init__(self, ID = None):
- SBHeaderMessage.__init__(self, ID)
- self.stored_attributes = [
- 'c',
- 't',
- 'block_state',
- 'account',
- 'date_modified',
- 'internaldate',
- 'flags',
- 'folder_name',
- 'uid']
- self.uid = None
- self.flags = None
- self.account = None
- self.folder_name = None
- self.block_state = None
- self.internaldate = None
-
-
- def setBlockState(self, state):
- '''Store the block state.'''
- self.block_state = state
- self.modified()
-
-
- def setAccount(self, account):
- '''Store the account this message belongs to for moving between
- blocked/delayed databases.'''
- self.account = account
- self.modified()
-
-
- def rememberBlockingState(self, account, state):
- '''Store the blocking state for this message.'''
- self.setAccount(account)
- self.setBlockState(state)
-
-
- def getBlockingState(self):
- '''Get the blocking state for this message.'''
- return (self.account, self.block_state)
-
-
- def setDisposition(self, prob):
- pass
-
-
- def addNotations(self):
- disposition = self.GetTrained()
- if disposition is None:
- disposition = self.GetClassification()
- elif disposition == True:
- disposition = options[('Headers', 'header_spam_string')]
- elif disposition == False:
- disposition = options[('Headers', 'header_ham_string')]
-
- self.notateTo(disposition)
- self.notateSubject(disposition)
-
-
- def notateSubject(self, disposition):
- '''Add the appropriate string to the subject: and/or to: header.'''
- if isinstance(options[('Headers', 'notate_subject')], types.StringTypes):
- notate_subject = (options[('Headers', 'notate_subject')],)
- else:
- notate_subject = options[('Headers', 'notate_subject')]
- if disposition in notate_subject:
-
- try:
- self.replace_header('Subject', '[%s] %s' % (disposition, self['Subject']))
- except KeyError:
- self['Subject'] = '[%s]' % (disposition,)
- except:
- None<EXCEPTION MATCH>KeyError
-
-
- None<EXCEPTION MATCH>KeyError
-
-
- def delNotations(self):
- '''If present, remove our notation from the subject: and/or to:
- header of the message.'''
- subject = self['Subject']
- if not subject:
- SBHeaderMessage.delNotations(self)
- return None
-
- self.replace_header('Subject', '')
- SBHeaderMessage.delNotations(self)
- if subject and options[('Headers', 'notate_subject')]:
- ham = '[%s] ' % (options[('Headers', 'header_ham_string')],)
- spam = '[%s] ' % (options[('Headers', 'header_spam_string')],)
- unsure = '[%s] ' % (options[('Headers', 'header_unsure_string')],)
- for disp in (ham, spam, unsure):
- if subject.startswith(disp):
- self.replace_header('Subject', subject[len(disp):])
- break
- continue
-
-
-
-
- def addHeaders(self, prob, clues):
- self.addSBHeaders(prob, clues)
-
-
- def currentHeaders(self):
- '''Return all current headers.'''
- return self.currentSBHeaders()
-
-
- def delHeaders(self):
- '''Remove all custom headers from this message.'''
- self.delSBHeaders()
-
-
- def _walkMultiPart(self, msg, content_type):
- '''"Return the first part of the specified type.'''
- for part in msg.get_payload():
- if part.is_multipart():
- return self._walkMultiPart(part, content_type)
-
- t = part.get_content_type()
- if t == content_type:
-
- try:
- if not part.get_content_charset():
- pass
- return unicode(part.get_payload(decode = True), 'latin-1')
- except (LookupError, UnicodeDecodeError):
- return unicode(part.get_payload(decode = True), 'latin-1')
- except:
- None<EXCEPTION MATCH>(LookupError, UnicodeDecodeError)
-
-
- None<EXCEPTION MATCH>(LookupError, UnicodeDecodeError)
-
- return ''
-
- strip_tags_re = re.compile('<.*?>', re.IGNORECASE | re.DOTALL)
-
- def _strip_html(self, content):
- '''Remove all HTML tags from the content, to produce a simple
- plain-text version.'''
- return self.strip_tags_re.sub('', content)
-
-
- def as_html(self):
- '''Return the first text/plain or text/html part of the
- message, converted to HTML.'''
- if self.is_multipart():
- body = self._walkMultiPart(self, 'text/plain')
- if body == '':
- body = self._strip_html(self._walkMultiPart(self, 'text/html'))
-
- else:
- body = self.get_payload(decode = True)
-
- try:
- if not self.get_content_charset():
- pass
- body = unicode(body, 'latin-1')
- except (UnicodeDecodeError, LookupError):
- body = unicode(body, 'latin-1')
-
-
- try:
- body = try_to_repair_damaged_base64(body)
- except UnicodeDecodeError:
- pass
-
- return body
-
-
- def fingerprint_hash(self):
- '''Return an ID suitable for use with the fingerprint system.'''
- return hash(self)
-
-
-
- def seheadermessage_from_string(s, _class = SEHeaderMessage, strict = False):
- return email.message_from_string(s, _class)
-
-